Following are the steps for incremental data loading using Change Tracking :
Step 1: Configuration and Table Creation in SQL Server and enabling Change Tracking connect to the existing on-premise SQL Server and open a SQL script in the existing database and write the below script ALTER DATABASE test SET CHANGE_TRACKING = ON (CHANGE_RETENTION = 2 DAYS, AUTO_CLEANUP = ON) The TRACK_COLUMNS_UPDATED = ON setting ensures that extra information about the columns updated through UPDATE statement is also recorded in the internal tracking table. Column tracking information may help in identifying the columns modified through the UPDATE statement. This option is set as OFF by default to avoid extra storage overhead.create the table student
CREATE TABLE [dbo].[Student](
[studentId] [int] IDENTITY(1,1) NOT NULL PRIMARY KEY,
[studentName] [varchar](100) NULL,
[stream] [varchar](50) NULL,
[marks] [int] NULL,
[createDate] [datetime2] NULL,
[updateDate] [datetime2] NULL
) ON [PRIMARY]
GO
ALTER TABLE [dbo].[Student]
ENABLE CHANGE_TRACKING
WITH (TRACK_COLUMNS_UPDATED = ON)
Step 2:
Create and Populate the ChangeTrackingStore Table
ChangeTrackingStore maintains the change tracking version information for the Student table. Information for multiple tables can also be maintained here. The CHANGE_TRACKING_CURRENT_VERSION() function is used to retrieve the change tracking version associated with the last committed transaction of the database.
-- create the ChangeTrackingStore table
create table dbo.ChangeTrackingStore
(
tableName varchar(255),
SYS_CHANGE_VERSION BIGINT,
);
-- retrieve the latest value from CHANGE_TRACKING_CURRENT_VERSION()
DECLARE @changeTrackingVersion BIGINT
SET @changeTrackingVersion = CHANGE_TRACKING_CURRENT_VERSION();
-- populate data in ChangeTrackingStore for Student table
INSERT INTO dbo.ChangeTrackingStore
VALUES ('Student', @changeTrackingVersion)
-- select data from ChangeTrackingStore table
SELECT tableName,SYS_CHANGE_VERSION
FROM dbo.ChangeTrackingStore
Step 3:
Insert Data in the Student Table
INSERT INTO dbo.Student (studentName,stream,marks,createDate,updateDate)
VALUES
('aaa','CSE',94,GETDATE(), GETDATE()),
('bbb','CSE',95,GETDATE(), GETDATE())
Step 4:
Create the Staging tables and Final Tables at the Azure end
The staging table will contain extra columns other than the student table which will be returned from the CHANGETABLE function. CHANGETABLE returns records with many columns along with the primary key of the table. So, a primary key is a prerequisite for implementing change tracking on a table. Following are the columns that would return from CHANGETABLE function
- SYS_CHANGE_VERSION : contains the version value that is associated with the last change to the row.
- SYS_CHANGE_OPERATION: specifies the type of change: U(Update), I(Insert), and D(Delete)
- SYS_CHANGE_COLUMNS: lists the columns that are changed since the last_sync_version for update operation
-- create staging table for Student
CREATE TABLE [dbo].[stgStudentCt](
[studentId] [int] NULL,
[studentName] [varchar](100) NULL,
[stream] [varchar](50) NULL,
[marks] [int] NULL,
[createDate] [datetime] NULL,
[updateDate] [datetime] NULL,
SYS_CHANGE_VERSION BIGINT,
SYS_CHANGE_OPERATION CHAR(5),
SYS_CHANGE_COLUMNS VARBINARY(50)
) ON [PRIMARY]
GO
-- create the final table Student again
CREATE TABLE [dbo].[Student](
[studentId] [int] NOT NULL,
[studentName] [varchar](100) NULL,
[stream] [varchar](50) NULL,
[marks] [int] NULL,
[createDate] [datetime] NULL,
[updateDate] [datetime] NULL
) ON [PRIMARY]
GO
step 5 :
Create a Self-Hosted Integration Runtime and Azure Integration Runtime
Step 6 :
Create a Linked Service for On-Premise SQL Server and Azure SQL Database
The linked service helps to link the source data store to the Data Factory. A Linked Service is similar to a connection string, as it defines the connection information required for the Data Factory to connect to the external data source.
Step 7 : Create a dataset named SqlServerTable1, for the table, dbo.Student, in the on-premise SQL Server database and SqlServerTable2, for the ChangeTrackingStore table in the on-premise SQL Server database. Step 8 : Create a student dataset and staging dataset for Azure.Step 9:
Select Author and monitoring option in the created Azure Data Factory and create a new pipeline. The overall pipeline will look in the following way :
Step 10 :
Create a Lookup Activities Create the first Lookup activity, named lkupLastChngTrackVer. The source dataset is set as SqlServerTable2, pointing to dbo.ChangeTrackingStore table.Create the second Lookup activitym, named lkupCurChngTrackVer, with the source dataset set to SqlServerTable1. This points to the dbo.Student table in the on-premise SQL Server. write the following query to retrieve the change tracking version associated with the last committed transaction of the database. In this task, the ‘First Row Only’ checkbox is selected.
SELECT CHANGE_TRACKING_CURRENT_VERSION() as CurrentChangeTrackingVersion.
Step 11 :
Create a copy Activity Create the Copy data activity and add the output links from the two lookup activities as input to the Copy data activity. In the source tab, the source dataset is set to SqlServerTable1. Write the following query to retrieve all the records from the Student table with the change operation and the change version details after performing a left outer join with CHANGETABLE. SELECT c.studentId, s.studentName, s.stream, s.marks, s.createDate, s.updateDate, c.SYS_CHANGE_VERSION, c.SYS_CHANGE_OPERATION, c.SYS_CHANGE_COLUMNS FROM Student AS s RIGHT OUTER JOIN CHANGETABLE( CHANGES Student, @{activity(‘lkupLastChngTrackVer’).output.firstRow.SYS_CHANGE_VERSION}) as c ON s.[studentId] = c.[studentId] where c.SYS_CHANGE_VERSION <= @{activity(‘lkupCurChngTrackVer’).output.firstRow.CurrentChangeTrackingVersion} In the sink tab, select stagingDataset as the sink dataset and write the pre copy script to truncate the staging table stgStudentCt every time before data loading.TRUNCATE TABLE [dbo].[stgStudentCt]
Step 12 : Create a Stored procedure Actitvity create a Stored Procedure activity next to the Copy Data activity. select the linked service, AzureSqlDatabase1, and the stored procedure, usp_Modify_Student. The purpose of this stored procedure is to delete the records from Azure SQL Student table that are already deleted from the source Student table after the last data load. The update and insert of records are also executed against the Student table from the staging stgStudentCt, based on the SYS_CHANGE_OPERATION column value. Following is the Stored Procedure :CREATE PROCEDURE dbo.usp_Modify_Student
AS
BEGIN
BEGIN TRY
BEGIN TRANSACTION
DELETE FROM dbo.Student
FROM dbo.Student AS S
INNER JOIN dbo.stgStudentCt AS Ct ON S.studentId = Ct.studentId
WHERE Ct.SYS_CHANGE_OPERATION = 'D'
INSERT INTO dbo.Student (studentid, studentName,stream,marks,createDate,updateDate)
SELECT studentid, studentName,stream,marks,createDate,updateDate
FROM dbo.stgStudentCt
WHERE SYS_CHANGE_OPERATION = 'I'
UPDATE dbo.Student
SET studentName = Ct.studentName,
stream = Ct.stream,
marks = Ct.marks,
createDate = Ct.createDate,
updateDate = Ct.updateDate
FROM dbo.Student AS S
INNER JOIN dbo.stgStudentCt AS Ct ON S.studentId = Ct.studentId
WHERE Ct.SYS_CHANGE_OPERATION = 'U'
COMMIT TRANSACTION
END TRY
BEGIN CATCH
ROLLBACK TRANSACTION
RETURN -1
END CATCH
RETURN 0
END
GO
Create the second Stored Procedure activity. Select the linked service, sourceSQL, and stored procedure, uspUpdateChangeTrackingStore. The purpose of this stored procedure is to update the SYS_CHANGE_VERSION column value of ChangeTrackingStore table with the current value of change tracking version.
This procedure takes two parameters: changeTrackingVersion and tableName.
CREATE PROCEDURE dbo.uspUpdateChangeTrackingStore @changeTrackingVersion BIGINT, @tableName varchar(255)
AS
BEGIN
UPDATE dbo.ChangeTrackingStore
SET SYS_CHANGE_VERSION = @changeTrackingVersion
WHERE tableName = @tableName
END
--parameter values
changeTrackingVersion -- @{activity('lkupCurChngTrackVer').output.firstRow.CurrentChangeTrackingVersion}
tableName -- @{activity('lkupLastChngTrackVer').output.firstRow.tableName}
Step 13 :
Debug the Pipeline for initial load and check the data in Target Student Table. We can see all the records inserted in the
source table are present in the staging table with SYS_CHANGE_VERSION as 'I'.
Modify some data On-Premise and debug the pipeline for incremental load and check the data at Azure SQL Database.
stgStudentCt has entries for the inserted, updated and deleted records. In case of deleted record, only studentid column has
value and rest of the Student table columns contain NULL. Because, in case of deletion, record does not exist in Student table
in on-premise SQL Server database, But, the primary key of the deleted record is returned from CHANGETABLE.
Conclusion:
When data is transferred from a source to a target data store, there is almost always a requirement for the incremental loading
of data. The Change Tracking feature is available in SQL Server 2019 and Azure SQL database, so building an incremental load is
easy and not very time consuming. The change tracking mechanism helps to identify the inserted, updated and deleted records in
the source table, which can be performed on the target table.